深度理解 AWS SQS 与 SNS:从原理到生产实战
用了十几年消息队列,从 ActiveMQ 到 RabbitMQ 到 Kafka,再到 AWS 的 SQS/SNS,我最大的感受是:消息系统看起来简单,真正用好需要对底层机制有深刻理解。这篇文章不讲入门操作,专门讲那些文档里不会告诉你、但在生产环境中会让你半夜被叫醒的东西。
一、SQS 的本质:你以为的队列和它实际的样子
很多人把 SQS 当成一个简单的 FIFO 管道来用,这是第一个认知错误。SQS Standard Queue 本质上是一个分布式的消息存储系统,它的设计目标是无限吞吐和极高可用,为此牺牲了严格有序和精确一次投递。
1.1 Standard Queue 的内部架构
SQS Standard Queue 在 AWS 内部是多个服务器组成的集群,消息写入时会被复制到多个节点。这意味着:
- 消息可能被投递多次:因为分布式复制,同一条消息可能存在于多个节点,不同的消费者可能从不同节点拉到同一条消息。这不是 bug,是设计。
- 消息顺序不保证:消息分散在多个节点上,消费者拉取时从不同节点获取,自然无法保证全局顺序。
- 吞吐几乎无限:正因为是分布式的,没有单点瓶颈,每秒处理几万甚至几十万条消息都不是问题。
# 生产环境必须处理重复消息 — 幂等性设计
import hashlib
import boto3
import redis
sqs = boto3.client('sqs')
r = redis.Redis()
def process_message(msg):
# 用消息内容生成唯一指纹
body = msg['Body']
msg_hash = hashlib.sha256(body.encode()).hexdigest()
# Redis SETNX 实现幂等:处理过的消息直接跳过
lock_key = f"sqs:processed:{msg_hash}"
if not r.set(lock_key, "1", nx=True, ex=86400):
print(f"重复消息,跳过: {msg_hash[:16]}")
return True # 返回 True 表示可以删除
try:
# 实际业务处理
do_business_logic(body)
return True
except Exception as e:
# 处理失败,删除幂等锁,允许重试
r.delete(lock_key)
raise
1.2 FIFO Queue 的代价
FIFO Queue 解决了顺序和精确一次的问题,但代价很大:
- 吞吐限制:每个 Message Group ID 每秒最多 300 条消息(开启高吞吐模式可到 3000 条/秒,但有条件)
- 高吞吐模式的陷阱:开启后,同一个 Message Group 内的消息在批量发送时可能不严格有序。也就是说,你为了吞吐牺牲了 FIFO 最核心的卖点
- 成本更高:FIFO 队列的请求费用是 Standard 的约 1.2 倍
# FIFO Queue 的正确使用方式:按业务实体分组
import json
import uuid
def send_order_event(sqs_client, queue_url, order_id, event_type, data):
"""
关键:MessageGroupId 按 order_id 分组
同一个订单的事件严格有序,不同订单之间并行处理
"""
return sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({
'order_id': order_id,
'event_type': event_type,
'data': data,
'timestamp': int(time.time() * 1000)
}),
MessageGroupId=str(order_id), # 同一订单的事件有序
MessageDeduplicationId=str(uuid.uuid4()) # 或用内容去重
)
# 错误示范:所有消息用同一个 GroupId
# 这会导致整个队列变成单线程,吞吐暴跌
# MessageGroupId="all-messages" # 千万别这么干
1.3 我的选型建议
经过这些年的实践,我的选型原则很简单:
| 场景 | 选择 | 原因 |
|---|---|---|
| 异步任务(发邮件、生成报表) | Standard Queue | 不需要顺序,重复处理也无害 |
| 订单状态流转 | FIFO Queue | 同一订单的状态变更必须有序 |
| 日志收集 | Standard Queue | 丢一条无所谓,吞吐是关键 |
| 支付回调 | FIFO Queue + 幂等 | 不能重复扣款,顺序也重要 |
| 每秒超过 3000 条且需要有序 | Kinesis 或 Kafka | SQS FIFO 扛不住 |
二、Visibility Timeout — 最容易出事的配置
Visibility Timeout 是 SQS 最核心也最容易配错的参数。消费者拉取消息后,这条消息对其他消费者"隐藏"一段时间,这就是 Visibility Timeout。如果在这段时间内消费者没有删除消息,消息会重新变为可见,被其他消费者再次拉取。
2.1 设太短:消息被重复处理
假设你的业务处理需要 30 秒,但 Visibility Timeout 设了 15 秒。消息处理到一半,就被另一个消费者拉走了,两个消费者同时处理同一条消息。这在支付场景下就是灾难。
2.2 设太长:故障恢复慢
如果消费者进程崩溃了,消息要等到 Visibility Timeout 过期才能被重新处理。设了 12 小时?那这条消息就要等 12 小时。
2.3 正确做法:动态延长
import threading
import boto3
class SQSHeartbeat:
"""
消息心跳机制:处理过程中持续延长 Visibility Timeout
类似于分布式锁的续期机制
"""
def __init__(self, sqs_client, queue_url, receipt_handle,
timeout=30, interval=10):
self.sqs = sqs_client
self.queue_url = queue_url
self.receipt_handle = receipt_handle
self.timeout = timeout
self.interval = interval
self._stop = threading.Event()
self._thread = None
def start(self):
self._thread = threading.Thread(target=self._heartbeat, daemon=True)
self._thread.start()
return self
def _heartbeat(self):
while not self._stop.wait(self.interval):
try:
self.sqs.change_message_visibility(
QueueUrl=self.queue_url,
ReceiptHandle=self.receipt_handle,
VisibilityTimeout=self.timeout
)
except Exception as e:
print(f"心跳续期失败: {e}")
break
def stop(self):
self._stop.set()
if self._thread:
self._thread.join(timeout=5)
# 使用方式
def process_with_heartbeat(sqs, queue_url, message):
heartbeat = SQSHeartbeat(sqs, queue_url, message['ReceiptHandle'])
heartbeat.start()
try:
# 即使处理 5 分钟也不会超时
result = long_running_task(message['Body'])
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
return result
finally:
heartbeat.stop()
经验法则:Visibility Timeout 设为预期处理时间的 6 倍,同时用心跳机制兜底。比如预期 10 秒处理完,Timeout 设 60 秒,心跳每 20 秒续一次。
三、Dead Letter Queue — 不是垃圾桶,是救命稻草
DLQ(死信队列)是消息处理失败后的最后一道防线。消息被消费指定次数(maxReceiveCount)后仍未被删除,就会被转移到 DLQ。很多团队配了 DLQ 就不管了,这是大错特错。
3.1 DLQ 的正确运维姿势
import json
import boto3
from datetime import datetime
class DLQMonitor:
"""
DLQ 监控与自动重驱动
生产环境必须有 DLQ 告警 + 定期巡检
"""
def __init__(self, region='ap-southeast-1'):
self.sqs = boto3.client('sqs', region_name=region)
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
def get_dlq_depth(self, dlq_url):
"""获取 DLQ 中积压的消息数"""
resp = self.sqs.get_queue_attributes(
QueueUrl=dlq_url,
AttributeNames=['ApproximateNumberOfMessages',
'ApproximateNumberOfMessagesNotVisible']
)
attrs = resp['Attributes']
visible = int(attrs['ApproximateNumberOfMessages'])
in_flight = int(attrs['ApproximateNumberOfMessagesNotVisible'])
return visible + in_flight
def redrive_messages(self, dlq_url, target_url, batch_size=10, max_messages=100):
"""
将 DLQ 中的消息重新驱动到原始队列
注意:要先修复导致失败的 bug,再重驱动
"""
redrived = 0
while redrived < max_messages:
resp = self.sqs.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=min(batch_size, max_messages - redrived),
WaitTimeSeconds=1
)
messages = resp.get('Messages', [])
if not messages:
break
for msg in messages:
# 发送到原始队列
self.sqs.send_message(
QueueUrl=target_url,
MessageBody=msg['Body'],
MessageAttributes=msg.get('MessageAttributes', {})
)
# 从 DLQ 删除
self.sqs.delete_message(
QueueUrl=dlq_url,
ReceiptHandle=msg['ReceiptHandle']
)
redrived += 1
print(f"重驱动完成: {redrived} 条消息")
return redrived
3.2 DLQ 配置的关键参数
{
"RedrivePolicy": {
"deadLetterTargetArn": "arn:aws:sqs:ap-southeast-1:123456789:my-queue-dlq",
"maxReceiveCount": 3
}
}
maxReceiveCount 怎么设?
- 设 1:消息失败一次就进 DLQ。适合对延迟敏感、有完善人工处理流程的场景
- 设 3-5:给瞬时故障(网络抖动、下游短暂不可用)自动恢复的机会。这是大多数场景的推荐值
- 设 10+:基本没意义,如果重试 10 次还失败,说明不是瞬时问题
DLQ 本身也要配 DLQ 吗?不需要。DLQ 的消息应该由运维人员或自动化脚本处理,不应该再有"死信的死信"。但 DLQ 的消息保留期要设长一些(建议 14 天),给你足够的时间排查和修复。
四、Long Polling vs Short Polling — 省钱的关键
这是一个很多人忽略但直接影响成本的配置。
Short Polling(默认):每次 ReceiveMessage 立即返回,即使队列为空也算一次请求。如果你的消费者每秒轮询一次,一天就是 86400 次请求,一个月 259 万次。乘以消费者数量,费用很可观。
Long Polling:设置 WaitTimeSeconds(最大 20 秒),队列为空时请求会挂起等待,有消息到达时立即返回。这样空闲时段的请求次数大幅减少。
# 错误:Short Polling,疯狂烧钱
while True:
resp = sqs.receive_message(QueueUrl=queue_url) # 立即返回
if not resp.get('Messages'):
time.sleep(1) # 自己加 sleep 也不如 Long Polling 优雅
# 正确:Long Polling
while True:
resp = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10, # 一次最多拉 10 条
WaitTimeSeconds=20, # 最多等 20 秒
MessageAttributeNames=['All'] # 拉取所有自定义属性
)
messages = resp.get('Messages', [])
for msg in messages:
process_and_delete(msg)
实测数据:一个中等流量的系统(日均 50 万条消息),从 Short Polling 切到 Long Polling 后,SQS 请求费用降低了约 60%。
五、SNS — 不只是"发通知"
很多人对 SNS 的理解停留在"发短信、发邮件",这严重低估了它。SNS 的核心价值是扇出(Fan-out):一条消息发布到 Topic,多个订阅者同时收到。这是事件驱动架构的基石。
5.1 SNS + SQS Fan-out 模式
这是 AWS 上最经典的解耦模式,没有之一:
┌──→ SQS: order-processing → 订单服务
│
用户下单 → SNS Topic ├──→ SQS: inventory-update → 库存服务
│
├──→ SQS: notification-send → 通知服务
│
└──→ SQS: analytics-collect → 数据分析
为什么不直接让订单服务调用其他服务的 API?因为:
- 解耦:订单服务不需要知道有多少下游消费者,新增消费者只需要订阅 Topic
- 容错:库存服务挂了不影响通知服务,每个 SQS 队列独立消费
- 削峰:秒杀场景下,SNS 瞬间扇出到多个 SQS,每个服务按自己的速度消费
- 可追溯:每个队列都有独立的 DLQ,哪个环节出问题一目了然
5.2 消息过滤 — SNS 的杀手锏
SNS 支持在订阅级别设置过滤策略,只有匹配的消息才会投递到对应的 SQS。这避免了消费者收到大量无关消息再自己过滤的浪费。
import boto3
import json
sns = boto3.client('sns')
# 创建订阅时设置过滤策略
sns.subscribe(
TopicArn='arn:aws:sns:ap-southeast-1:123456789:order-events',
Protocol='sqs',
Endpoint='arn:aws:sqs:ap-southeast-1:123456789:high-value-orders',
Attributes={
'FilterPolicy': json.dumps({
# 只接收金额大于 1000 的订单事件
'order_amount': [{'numeric': ['>=', 1000]}],
# 且事件类型是 created 或 paid
'event_type': ['order_created', 'order_paid'],
# 且地区是华东或华南
'region': [{'prefix': 'east-'}, {'prefix': 'south-'}]
}),
# 基于消息体过滤(而非消息属性)
'FilterPolicyScope': 'MessageBody'
}
)
# 发布消息时带上属性
sns.publish(
TopicArn='arn:aws:sns:ap-southeast-1:123456789:order-events',
Message=json.dumps({
'order_id': 'ORD-20250225-001',
'amount': 2500,
'items': ['MacBook Pro', 'AirPods']
}),
MessageAttributes={
'event_type': {'DataType': 'String', 'StringValue': 'order_created'},
'order_amount': {'DataType': 'Number', 'StringValue': '2500'},
'region': {'DataType': 'String', 'StringValue': 'east-shanghai'}
}
)
过滤策略支持的操作符:
- 精确匹配:
["order_created"] - 前缀匹配:
[{"prefix": "east-"}] - 数值范围:
[{"numeric": [">=", 100, "<=", 1000]}] - 存在性检查:
[{"exists": true}] - 否定匹配:
[{"anything-but": "test"}]
合理使用过滤策略,可以减少 70% 以上的无效消息投递,直接省钱。
5.3 SNS 的投递重试策略
SNS 向 SQS 投递失败时会自动重试,但不同协议的重试策略差异很大:
- SQS/Lambda:立即重试 3 次,几乎不会失败(都在 AWS 内网)
- HTTP/HTTPS:分 4 个阶段共重试 38 次,持续约 23 分钟。可以自定义重试策略
- Email/SMS:不重试,失败就丢了
// HTTP 端点的自定义重试策略
{
"healthyRetryPolicy": {
"numRetries": 10,
"numNoDelayRetries": 2,
"minDelayTarget": 5,
"maxDelayTarget": 60,
"numMinDelayRetries": 3,
"numMaxDelayRetries": 5,
"backoffFunction": "exponential"
}
}
六、生产级消费者架构
写一个 while True 循环拉消息谁都会,但生产环境的消费者要考虑的东西多得多。
6.1 批量处理 + 并发控制
import boto3
import concurrent.futures
import signal
import sys
class ProductionConsumer:
"""
生产级 SQS 消费者
特性:优雅关闭、批量拉取、并发处理、错误隔离
"""
def __init__(self, queue_url, handler, workers=5, region='ap-southeast-1'):
self.sqs = boto3.client('sqs', region_name=region)
self.queue_url = queue_url
self.handler = handler
self.workers = workers
self.running = True
# 注册信号处理,支持优雅关闭
signal.signal(signal.SIGTERM, self._shutdown)
signal.signal(signal.SIGINT, self._shutdown)
def _shutdown(self, signum, frame):
print(f"收到信号 {signum},准备优雅关闭...")
self.running = False
def _process_single(self, message):
"""处理单条消息,异常不影响其他消息"""
try:
self.handler(message)
self.sqs.delete_message(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle']
)
return True
except Exception as e:
print(f"处理失败 MessageId={message['MessageId']}: {e}")
# 不删除消息,让它在 Visibility Timeout 后重新可见
return False
def run(self):
print(f"消费者启动,{self.workers} 个工作线程")
with concurrent.futures.ThreadPoolExecutor(max_workers=self.workers) as executor:
while self.running:
# 批量拉取
resp = self.sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
MessageAttributeNames=['All'],
AttributeNames=['All']
)
messages = resp.get('Messages', [])
if not messages:
continue
# 并发处理
futures = {
executor.submit(self._process_single, msg): msg
for msg in messages
}
# 等待本批次全部完成再拉下一批
concurrent.futures.wait(futures, timeout=300)
print("消费者已优雅关闭")
# 使用
def handle_order(message):
data = json.loads(message['Body'])
print(f"处理订单: {data['order_id']}")
consumer = ProductionConsumer(
queue_url='https://sqs.ap-southeast-1.amazonaws.com/123456789/orders',
handler=handle_order,
workers=10
)
consumer.run()
6.2 Lambda 作为消费者的注意事项
SQS 触发 Lambda 是最常见的 Serverless 模式,但有几个坑:
# Lambda SQS 触发器的正确写法
def lambda_handler(event, context):
"""
关键点:
1. event['Records'] 是批量的,可能有 1-10 条消息
2. 部分成功时要用 batchItemFailures 报告失败的消息
3. 不要在 Lambda 里调用 delete_message,成功的消息会自动删除
"""
batch_item_failures = []
for record in event['Records']:
try:
body = json.loads(record['body'])
process_order(body)
except Exception as e:
print(f"处理失败: {record['messageId']}, 错误: {e}")
batch_item_failures.append({
'itemIdentifier': record['messageId']
})
# 返回失败的消息 ID,只有这些消息会被重试
# 如果不返回这个,整批消息都会重试(包括已成功的)
return {
'batchItemFailures': batch_item_failures
}
Lambda + SQS 的关键配置:
- BatchSize:一次给 Lambda 多少条消息。CPU 密集型任务设小(1-3),IO 密集型可以设大(10)
- MaximumBatchingWindowInSeconds:最多等多久凑一批。设 5 秒可以提高批处理效率,但增加延迟
- FunctionResponseTypes:必须包含 "ReportBatchItemFailures",否则部分失败会导致整批重试
- 并发控制:SQS 会自动扩展 Lambda 并发数(最多 1000),用 ReservedConcurrency 限制,防止打爆下游
七、延迟队列与定时任务
SQS 支持两种延迟机制,很多人搞混:
- Queue Delay(DelaySeconds):队列级别,所有消息入队后延迟 N 秒才可见。最大 15 分钟
- Message Delay(DelaySeconds per message):消息级别,单条消息延迟。Standard Queue 支持,FIFO Queue 不支持消息级别延迟
# 场景:订单 30 分钟未支付自动取消
def create_order(order_id):
# 1. 创建订单
save_order(order_id, status='pending')
# 2. 发送延迟消息,30 分钟后检查支付状态
sqs.send_message(
QueueUrl=TIMEOUT_CHECK_QUEUE,
MessageBody=json.dumps({
'order_id': order_id,
'action': 'check_payment_timeout',
'created_at': int(time.time())
}),
DelaySeconds=900 # 15 分钟(SQS 最大值)
)
# 如果需要 30 分钟,怎么办?见下面的方案
def handle_timeout_check(message):
data = json.loads(message['Body'])
order = get_order(data['order_id'])
elapsed = int(time.time()) - data['created_at']
if order['status'] == 'paid':
return # 已支付,不需要取消
if elapsed < 1800: # 还没到 30 分钟
# 重新入队,继续等待(阶梯延迟)
remaining = 1800 - elapsed
delay = min(remaining, 900) # 最多再延迟 15 分钟
sqs.send_message(
QueueUrl=TIMEOUT_CHECK_QUEUE,
MessageBody=message['Body'],
DelaySeconds=delay
)
return
# 超时,取消订单
cancel_order(data['order_id'])
超过 15 分钟延迟的解决方案:
- 阶梯延迟(如上代码):消息到期后检查是否真的超时,没超时就重新入队。简单但有额外的 SQS 请求成本
- DynamoDB TTL + Stream:把定时任务写入 DynamoDB 并设置 TTL,过期时触发 Stream → Lambda。精确度在分钟级别
- EventBridge Scheduler:AWS 原生定时调度,支持一次性和周期性任务,精确到秒。这是目前最推荐的方案
八、消息体设计 — 被忽视的架构决策
消息体怎么设计,直接影响系统的可维护性和扩展性。
8.1 胖消息 vs 瘦消息
// 胖消息:把所有数据都放在消息里
{
"event": "order_created",
"order": {
"id": "ORD-001",
"user_id": "U-123",
"user_name": "张三",
"user_email": "[email]",
"items": [...],
"total_amount": 2500,
"shipping_address": {...},
"payment_method": {...}
}
}
// 瘦消息:只放引用,消费者自己查
{
"event": "order_created",
"order_id": "ORD-001",
"timestamp": 1740000000
}
我的建议:用"适度胖"的消息。
- 包含消费者处理所需的核心数据,避免消费者再回查数据库(减少延迟和数据库压力)
- 不包含可能频繁变化的数据(如用户昵称),这些数据在消费时可能已经过期
- SQS 消息体最大 256KB,如果超了,把数据放 S3,消息里放 S3 的 key
8.2 大消息处理
# 超过 256KB 的消息:SQS Extended Client 模式
import boto3
import json
import uuid
s3 = boto3.client('s3')
sqs = boto3.client('sqs')
BUCKET = 'my-sqs-large-messages'
def send_large_message(queue_url, data):
body = json.dumps(data)
if len(body.encode('utf-8')) > 200000: # 留点余量
# 存 S3
s3_key = f"sqs-messages/{uuid.uuid4()}.json"
s3.put_object(Bucket=BUCKET, Key=s3_key, Body=body)
# 消息里放引用
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({
'_large_message': True,
'_s3_bucket': BUCKET,
'_s3_key': s3_key
})
)
else:
sqs.send_message(QueueUrl=queue_url, MessageBody=body)
def receive_large_message(message):
body = json.loads(message['Body'])
if body.get('_large_message'):
# 从 S3 读取实际内容
resp = s3.get_object(Bucket=body['_s3_bucket'], Key=body['_s3_key'])
return json.loads(resp['Body'].read())
return body
九、监控与告警 — 生产环境的眼睛
消息系统最怕的不是宕机,是消息悄悄积压没人知道。以下是必须配置的监控指标:
9.1 核心监控指标
# Terraform 配置 SQS 关键告警
# 1. 队列积压告警 — 最重要的指标
resource "aws_cloudwatch_metric_alarm" "sqs_backlog" {
alarm_name = "sqs-order-queue-backlog"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 3
metric_name = "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = 300
statistic = "Maximum"
threshold = 1000 # 根据业务调整
alarm_description = "订单队列积压超过 1000 条"
dimensions = {
QueueName = "order-processing"
}
alarm_actions = [aws_sns_topic.alerts.arn]
}
# 2. 消息年龄告警 — 消息在队列里待了多久
resource "aws_cloudwatch_metric_alarm" "sqs_message_age" {
alarm_name = "sqs-order-queue-age"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 2
metric_name = "ApproximateAgeOfOldestMessage"
namespace = "AWS/SQS"
period = 300
statistic = "Maximum"
threshold = 3600 # 最老的消息超过 1 小时
alarm_description = "订单队列最老消息超过 1 小时未处理"
dimensions = {
QueueName = "order-processing"
}
alarm_actions = [aws_sns_topic.alerts.arn]
}
# 3. DLQ 非空告警 — DLQ 有消息就要告警
resource "aws_cloudwatch_metric_alarm" "dlq_not_empty" {
alarm_name = "sqs-order-dlq-not-empty"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 1
metric_name = "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = 60
statistic = "Sum"
threshold = 0
alarm_description = "订单 DLQ 中有失败消息,需要人工介入"
dimensions = {
QueueName = "order-processing-dlq"
}
alarm_actions = [aws_sns_topic.alerts.arn]
}
9.2 自定义业务指标
# 在消费者中埋点,推送自定义指标到 CloudWatch
import boto3
import time
cloudwatch = boto3.client('cloudwatch')
def publish_consumer_metrics(queue_name, processing_time, success):
cloudwatch.put_metric_data(
Namespace='Custom/SQS',
MetricData=[
{
'MetricName': 'MessageProcessingTime',
'Value': processing_time,
'Unit': 'Milliseconds',
'Dimensions': [
{'Name': 'QueueName', 'Value': queue_name}
]
},
{
'MetricName': 'MessageProcessingSuccess' if success
else 'MessageProcessingFailure',
'Value': 1,
'Unit': 'Count',
'Dimensions': [
{'Name': 'QueueName', 'Value': queue_name}
]
}
]
)
十、安全最佳实践
10.1 队列访问策略
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowSNSToSendMessage",
"Effect": "Allow",
"Principal": {"Service": "sns.amazonaws.com"},
"Action": "sqs:SendMessage",
"Resource": "arn:aws:sqs:ap-southeast-1:123456789:order-queue",
"Condition": {
"ArnEquals": {
"aws:SourceArn": "arn:aws:sns:ap-southeast-1:123456789:order-topic"
}
}
},
{
"Sid": "DenyNonSSL",
"Effect": "Deny",
"Principal": "*",
"Action": "sqs:*",
"Resource": "arn:aws:sqs:ap-southeast-1:123456789:order-queue",
"Condition": {
"Bool": {"aws:SecureTransport": "false"}
}
}
]
}
10.2 加密
- 传输加密:SQS 默认使用 HTTPS,已经加密
- 静态加密:开启 SSE-SQS(免费)或 SSE-KMS(可以控制密钥权限)
- 敏感数据:不要在消息体里放密码、密钥等敏感信息。如果必须传递,先用 KMS 加密,消费者端解密
十一、成本优化实战
SQS/SNS 按请求次数计费,优化的核心就是减少请求次数。
| 优化手段 | 效果 | 实现方式 |
|---|---|---|
| Long Polling | 减少 60% 空轮询 | WaitTimeSeconds=20 |
| 批量发送 | 减少 90% 发送请求 | SendMessageBatch,一次最多 10 条 |
| 批量删除 | 减少 90% 删除请求 | DeleteMessageBatch |
| SNS 消息过滤 | 减少无效投递 | FilterPolicy |
| 合并小消息 | 减少总请求数 | 应用层攒批后发送 |
# 批量操作示例:发送和删除都用 Batch
def batch_send(sqs, queue_url, messages):
"""批量发送,一次最多 10 条"""
entries = []
for i, msg in enumerate(messages):
entries.append({
'Id': str(i),
'MessageBody': json.dumps(msg)
})
if len(entries) == 10:
resp = sqs.send_message_batch(
QueueUrl=queue_url, Entries=entries
)
_check_failures(resp)
entries = []
if entries:
resp = sqs.send_message_batch(
QueueUrl=queue_url, Entries=entries
)
_check_failures(resp)
def batch_delete(sqs, queue_url, messages):
"""批量删除已处理的消息"""
entries = [
{'Id': str(i), 'ReceiptHandle': msg['ReceiptHandle']}
for i, msg in enumerate(messages)
]
if entries:
resp = sqs.delete_message_batch(
QueueUrl=queue_url, Entries=entries
)
_check_failures(resp)
def _check_failures(resp):
failed = resp.get('Failed', [])
if failed:
print(f"批量操作部分失败: {failed}")
十二、真实故障案例复盘
案例一:消息风暴导致 Lambda 并发打满
现象:上游突然发了 10 万条消息到 SQS,Lambda 并发瞬间飙到 1000(账号默认上限),其他 Lambda 函数全部无法执行。
根因:SQS 触发 Lambda 时会自动扩展并发,没有设置 ReservedConcurrency。
解决:
- 给 SQS 触发的 Lambda 设置 ReservedConcurrency=100
- 给关键业务 Lambda 也设置 ReservedConcurrency,确保不被挤占
- 在 SQS 和 Lambda 之间加一层限流逻辑
案例二:FIFO 队列吞吐骤降
现象:FIFO 队列平时每秒处理 200 条消息,某天突然降到每秒 10 条。
根因:有一个消费者处理某条消息时卡住了(下游超时),这条消息的 MessageGroupId 对应的整个分组都被阻塞。而恰好 80% 的消息都用了同一个 GroupId。
解决:
- 重新设计 MessageGroupId,按业务实体(如 user_id)分组,而不是按消息类型
- 给消费者加超时控制和心跳机制
- 监控单个 MessageGroup 的积压情况
案例三:DLQ 消息丢失
现象:DLQ 里的消息莫名其妙消失了,但没有人处理过。
根因:DLQ 的消息保留期(MessageRetentionPeriod)用了默认的 4 天,而运维团队的 DLQ 巡检周期是每周一次。消息在被发现之前就过期删除了。
解决:DLQ 的 MessageRetentionPeriod 设为 14 天(最大值),同时配置 DLQ 非空告警,有消息立即通知。
十三、SQS/SNS vs 其他消息服务的选择
| 维度 | SQS/SNS | Kinesis | MSK (Kafka) | EventBridge |
|---|---|---|---|---|
| 运维成本 | 零运维 | 低(Serverless 模式) | 高(要管集群) | 零运维 |
| 吞吐 | Standard 无限 / FIFO 3000/s | 每分片 1MB/s | 极高 | 有限 |
| 消息保留 | 最多 14 天 | 最多 365 天 | 无限 | 不保留 |
| 消息回溯 | 不支持 | 支持 | 支持 | 可归档到 S3 |
| 消费模型 | 拉取,消费即删 | 拉取,基于位移 | 拉取,基于 offset | 推送 |
| 适用场景 | 任务队列、解耦 | 实时流处理 | 大数据管道 | 事件路由 |
选型口诀:任务解耦用 SQS,广播通知用 SNS,实时流用 Kinesis,大数据用 Kafka,事件路由用 EventBridge。不要用一个服务解决所有问题,组合使用才是正道。
总结
SQS 和 SNS 看起来是 AWS 最简单的服务之一,但要在生产环境中用好,需要理解它们的分布式本质。核心原则:
- 永远假设消息会重复,做好幂等
- Visibility Timeout 要配合心跳机制
- DLQ 不是配了就完事,要监控、要告警、要有处理流程
- Long Polling + 批量操作是省钱的关键
- SNS 的消息过滤能力被严重低估,用好了能省大量计算资源
- 监控三板斧:队列深度、消息年龄、DLQ 非空告警
- 消费者要有优雅关闭、并发控制、错误隔离的能力
这些经验都是从真实故障中总结出来的,希望能帮你少踩一些坑。
留言板
留言提交后需管理员审核通过才会显示